/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include "orc/Int128.hh"

#include "Adaptor.hh"
#include "ByteRLE.hh"
#include "ColumnReader.hh"
#include "ConvertColumnReader.hh"
#include "RLE.hh"
#include "SchemaEvolution.hh"
#include "orc/Exceptions.hh"

#include <math.h>
#include <iostream>

namespace orc {

  StripeStreams::~StripeStreams() {
    // PASS
  }

  inline RleVersion convertRleVersion(proto::ColumnEncoding_Kind kind) {
    switch (static_cast<int64_t>(kind)) {
      case proto::ColumnEncoding_Kind_DIRECT:
      case proto::ColumnEncoding_Kind_DICTIONARY:
        return RleVersion_1;
      case proto::ColumnEncoding_Kind_DIRECT_V2:
      case proto::ColumnEncoding_Kind_DICTIONARY_V2:
        return RleVersion_2;
      default:
        throw ParseError("Unknown encoding in convertRleVersion");
    }
  }

  ColumnReader::ColumnReader(const Type& type, StripeStreams& stripe)
      : columnId(type.getColumnId()),
        memoryPool(stripe.getMemoryPool()),
        metrics(stripe.getReaderMetrics()) {
    std::unique_ptr<SeekableInputStream> stream =
        stripe.getStream(columnId, proto::Stream_Kind_PRESENT, true);
    if (stream.get()) {
      notNullDecoder = createBooleanRleDecoder(std::move(stream), metrics);
    }
  }

  ColumnReader::~ColumnReader() {
    // PASS
  }

  uint64_t ColumnReader::skip(uint64_t numValues) {
    ByteRleDecoder* decoder = notNullDecoder.get();
    if (decoder) {
      // page through the values that we want to skip
      // and count how many are non-null
      const size_t MAX_BUFFER_SIZE = 32768;
      size_t bufferSize = std::min(MAX_BUFFER_SIZE, static_cast<size_t>(numValues));
      char buffer[MAX_BUFFER_SIZE];
      uint64_t remaining = numValues;
      while (remaining > 0) {
        uint64_t chunkSize = std::min(remaining, static_cast<uint64_t>(bufferSize));
        decoder->next(buffer, chunkSize, nullptr);
        remaining -= chunkSize;
        for (uint64_t i = 0; i < chunkSize; ++i) {
          if (!buffer[i]) {
            numValues -= 1;
          }
        }
      }
    }
    return numValues;
  }

  void ColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* incomingMask) {
    if (numValues > rowBatch.capacity) {
      rowBatch.resize(numValues);
    }
    rowBatch.numElements = numValues;
    ByteRleDecoder* decoder = notNullDecoder.get();
    if (decoder) {
      char* notNullArray = rowBatch.notNull.data();
      decoder->next(notNullArray, numValues, incomingMask);
      // check to see if there are nulls in this batch
      for (uint64_t i = 0; i < numValues; ++i) {
        if (!notNullArray[i]) {
          rowBatch.hasNulls = true;
          return;
        }
      }
    } else if (incomingMask) {
      // If we don't have a notNull stream, copy the incomingMask
      rowBatch.hasNulls = true;
      memcpy(rowBatch.notNull.data(), incomingMask, numValues);
      return;
    }
    rowBatch.hasNulls = false;
  }

  void ColumnReader::seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions) {
    if (notNullDecoder.get()) {
      notNullDecoder->seek(positions.at(columnId));
    }
  }

  /**
   * Expand an array of bytes in place to the corresponding array of integer.
   * Has to work backwards so that they data isn't clobbered during the
   * expansion.
   * @param buffer the array of chars and array of longs that need to be
   *        expanded
   * @param numValues the number of bytes to convert to longs
   */
  template <typename T>
  void expandBytesToIntegers(T* buffer, uint64_t numValues) {
    if (sizeof(T) == sizeof(int8_t)) {
      return;
    }
    for (uint64_t i = 0UL; i < numValues; ++i) {
      buffer[numValues - 1 - i] = reinterpret_cast<int8_t*>(buffer)[numValues - 1 - i];
    }
  }

  template <typename BatchType>
  class BooleanColumnReader : public ColumnReader {
   private:
    std::unique_ptr<orc::ByteRleDecoder> rle_;

   public:
    BooleanColumnReader(const Type& type, StripeStreams& stipe);
    ~BooleanColumnReader() override;

    uint64_t skip(uint64_t numValues) override;

    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override;

    void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions) override;
  };

  template <typename BatchType>
  BooleanColumnReader<BatchType>::BooleanColumnReader(const Type& type, StripeStreams& stripe)
      : ColumnReader(type, stripe) {
    std::unique_ptr<SeekableInputStream> stream =
        stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
    if (stream == nullptr) throw ParseError("DATA stream not found in Boolean column");
    rle_ = createBooleanRleDecoder(std::move(stream), metrics);
  }

  template <typename BatchType>
  BooleanColumnReader<BatchType>::~BooleanColumnReader() {
    // PASS
  }

  template <typename BatchType>
  uint64_t BooleanColumnReader<BatchType>::skip(uint64_t numValues) {
    numValues = ColumnReader::skip(numValues);
    rle_->skip(numValues);
    return numValues;
  }

  template <typename BatchType>
  void BooleanColumnReader<BatchType>::next(ColumnVectorBatch& rowBatch, uint64_t numValues,
                                            char* notNull) {
    ColumnReader::next(rowBatch, numValues, notNull);
    // Since the byte rle places the output in a char* and BatchType here may be
    // LongVectorBatch with long*. We cheat here in that case and use the long*
    // and then expand it in a second pass..
    auto* ptr = dynamic_cast<BatchType&>(rowBatch).data.data();
    rle_->next(reinterpret_cast<char*>(ptr), numValues,
               rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr);
    expandBytesToIntegers(ptr, numValues);
  }

  template <typename BatchType>
  void BooleanColumnReader<BatchType>::seekToRowGroup(
      std::unordered_map<uint64_t, PositionProvider>& positions) {
    ColumnReader::seekToRowGroup(positions);
    rle_->seek(positions.at(columnId));
  }

  template <typename BatchType>
  class ByteColumnReader : public ColumnReader {
   private:
    std::unique_ptr<orc::ByteRleDecoder> rle_;

   public:
    ByteColumnReader(const Type& type, StripeStreams& stripe) : ColumnReader(type, stripe) {
      std::unique_ptr<SeekableInputStream> stream =
          stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
      if (stream == nullptr) throw ParseError("DATA stream not found in Byte column");
      rle_ = createByteRleDecoder(std::move(stream), metrics);
    }

    ~ByteColumnReader() override = default;

    uint64_t skip(uint64_t numValues) override {
      numValues = ColumnReader::skip(numValues);
      rle_->skip(numValues);
      return numValues;
    }

    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override {
      ColumnReader::next(rowBatch, numValues, notNull);
      // Since the byte rle places the output in a char* instead of long*,
      // we cheat here and use the long* and then expand it in a second pass.
      auto* ptr = dynamic_cast<BatchType&>(rowBatch).data.data();
      rle_->next(reinterpret_cast<char*>(ptr), numValues,
                 rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr);
      expandBytesToIntegers(ptr, numValues);
    }

    void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions) override {
      ColumnReader::seekToRowGroup(positions);
      rle_->seek(positions.at(columnId));
    }
  };

  template <typename BatchType>
  class IntegerColumnReader : public ColumnReader {
   protected:
    std::unique_ptr<orc::RleDecoder> rle;

   public:
    IntegerColumnReader(const Type& type, StripeStreams& stripe) : ColumnReader(type, stripe) {
      RleVersion vers = convertRleVersion(stripe.getEncoding(columnId).kind());
      std::unique_ptr<SeekableInputStream> stream =
          stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
      if (stream == nullptr) throw ParseError("DATA stream not found in Integer column");
      rle = createRleDecoder(std::move(stream), true, vers, memoryPool, metrics);
    }

    ~IntegerColumnReader() override {
      // PASS
    }

    uint64_t skip(uint64_t numValues) override {
      numValues = ColumnReader::skip(numValues);
      rle->skip(numValues);
      return numValues;
    }

    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override {
      ColumnReader::next(rowBatch, numValues, notNull);
      rle->next(dynamic_cast<BatchType&>(rowBatch).data.data(), numValues,
                rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr);
    }

    void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions) override {
      ColumnReader::seekToRowGroup(positions);
      rle->seek(positions.at(columnId));
    }
  };

  class TimestampColumnReader : public ColumnReader {
   private:
    std::unique_ptr<orc::RleDecoder> secondsRle_;
    std::unique_ptr<orc::RleDecoder> nanoRle_;
    const Timezone* writerTimezone_;
    const Timezone* readerTimezone_;
    const int64_t epochOffset_;
    const bool sameTimezone_;

   public:
    TimestampColumnReader(const Type& type, StripeStreams& stripe, bool isInstantType);
    ~TimestampColumnReader() override;

    uint64_t skip(uint64_t numValues) override;

    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override;

    void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions) override;
  };

  TimestampColumnReader::TimestampColumnReader(const Type& type, StripeStreams& stripe,
                                               bool isInstantType)
      : ColumnReader(type, stripe),
        writerTimezone_(isInstantType ? &getTimezoneByName("GMT") : &stripe.getWriterTimezone()),
        readerTimezone_(isInstantType ? &getTimezoneByName("GMT") : &stripe.getReaderTimezone()),
        epochOffset_(writerTimezone_->getEpoch()),
        sameTimezone_(writerTimezone_ == readerTimezone_) {
    RleVersion vers = convertRleVersion(stripe.getEncoding(columnId).kind());
    std::unique_ptr<SeekableInputStream> stream =
        stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
    if (stream == nullptr) throw ParseError("DATA stream not found in Timestamp column");
    secondsRle_ = createRleDecoder(std::move(stream), true, vers, memoryPool, metrics);
    stream = stripe.getStream(columnId, proto::Stream_Kind_SECONDARY, true);
    if (stream == nullptr) throw ParseError("SECONDARY stream not found in Timestamp column");
    nanoRle_ = createRleDecoder(std::move(stream), false, vers, memoryPool, metrics);
  }

  TimestampColumnReader::~TimestampColumnReader() {
    // PASS
  }

  uint64_t TimestampColumnReader::skip(uint64_t numValues) {
    numValues = ColumnReader::skip(numValues);
    secondsRle_->skip(numValues);
    nanoRle_->skip(numValues);
    return numValues;
  }

  void TimestampColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) {
    ColumnReader::next(rowBatch, numValues, notNull);
    notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
    TimestampVectorBatch& timestampBatch = dynamic_cast<TimestampVectorBatch&>(rowBatch);
    int64_t* secsBuffer = timestampBatch.data.data();
    secondsRle_->next(secsBuffer, numValues, notNull);
    int64_t* nanoBuffer = timestampBatch.nanoseconds.data();
    nanoRle_->next(nanoBuffer, numValues, notNull);

    // Construct the values
    for (uint64_t i = 0; i < numValues; i++) {
      if (notNull == nullptr || notNull[i]) {
        uint64_t zeros = nanoBuffer[i] & 0x7;
        nanoBuffer[i] >>= 3;
        if (zeros != 0) {
          for (uint64_t j = 0; j <= zeros; ++j) {
            nanoBuffer[i] *= 10;
          }
        }
        int64_t writerTime = secsBuffer[i] + epochOffset_;
        if (!sameTimezone_) {
          // adjust timestamp value to same wall clock time if writer and reader
          // time zones have different rules, which is required for Apache Orc.
          const auto& wv = writerTimezone_->getVariant(writerTime);
          const auto& rv = readerTimezone_->getVariant(writerTime);
          if (!wv.hasSameTzRule(rv)) {
            // If the timezone adjustment moves the millis across a DST boundary,
            // we need to reevaluate the offsets.
            int64_t adjustedTime = writerTime + wv.gmtOffset - rv.gmtOffset;
            const auto& adjustedReader = readerTimezone_->getVariant(adjustedTime);
            writerTime = writerTime + wv.gmtOffset - adjustedReader.gmtOffset;
          }
        }
        secsBuffer[i] = writerTime;
        if (secsBuffer[i] < 0 && nanoBuffer[i] > 999999) {
          secsBuffer[i] -= 1;
        }
      }
    }
  }

  void TimestampColumnReader::seekToRowGroup(
      std::unordered_map<uint64_t, PositionProvider>& positions) {
    ColumnReader::seekToRowGroup(positions);
    secondsRle_->seek(positions.at(columnId));
    nanoRle_->seek(positions.at(columnId));
  }

  template <TypeKind columnKind, bool isLittleEndian, typename ValueType, typename BatchType>
  class DoubleColumnReader : public ColumnReader {
   public:
    DoubleColumnReader(const Type& type, StripeStreams& stripe);
    ~DoubleColumnReader() override {}

    uint64_t skip(uint64_t numValues) override;

    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override;

    void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions) override;

   private:
    std::unique_ptr<SeekableInputStream> inputStream_;
    const uint64_t bytesPerValue_ = (columnKind == FLOAT) ? 4 : 8;
    const char* bufferPointer_;
    const char* bufferEnd_;

    unsigned char readByte() {
      if (bufferPointer_ == bufferEnd_) {
        int length;
        if (!inputStream_->Next(reinterpret_cast<const void**>(&bufferPointer_), &length)) {
          throw ParseError("bad read in DoubleColumnReader::next()");
        }
        bufferEnd_ = bufferPointer_ + length;
      }
      return static_cast<unsigned char>(*(bufferPointer_++));
    }

    template <typename FloatType>
    FloatType readDouble() {
      int64_t bits = 0;
      if (bufferEnd_ - bufferPointer_ >= 8) {
        if (isLittleEndian) {
          memcpy(&bits, bufferPointer_, sizeof(bits));
        } else {
          bits = static_cast<int64_t>(static_cast<unsigned char>(bufferPointer_[0]));
          bits |= static_cast<int64_t>(static_cast<unsigned char>(bufferPointer_[1])) << 8;
          bits |= static_cast<int64_t>(static_cast<unsigned char>(bufferPointer_[2])) << 16;
          bits |= static_cast<int64_t>(static_cast<unsigned char>(bufferPointer_[3])) << 24;
          bits |= static_cast<int64_t>(static_cast<unsigned char>(bufferPointer_[4])) << 32;
          bits |= static_cast<int64_t>(static_cast<unsigned char>(bufferPointer_[5])) << 40;
          bits |= static_cast<int64_t>(static_cast<unsigned char>(bufferPointer_[6])) << 48;
          bits |= static_cast<int64_t>(static_cast<unsigned char>(bufferPointer_[7])) << 56;
        }
        bufferPointer_ += 8;
      } else {
        for (uint64_t i = 0; i < 8; i++) {
          bits |= static_cast<int64_t>(readByte()) << (i * 8);
        }
      }
      FloatType* result = reinterpret_cast<FloatType*>(&bits);
      return *result;
    }

    template <typename FloatType>
    FloatType readFloat() {
      int32_t bits = 0;
      if (bufferEnd_ - bufferPointer_ >= 4) {
        if (isLittleEndian) {
          bits = *(reinterpret_cast<const int32_t*>(bufferPointer_));
        } else {
          bits = static_cast<unsigned char>(bufferPointer_[0]);
          bits |= static_cast<unsigned char>(bufferPointer_[1]) << 8;
          bits |= static_cast<unsigned char>(bufferPointer_[2]) << 16;
          bits |= static_cast<unsigned char>(bufferPointer_[3]) << 24;
        }
        bufferPointer_ += 4;
      } else {
        for (uint64_t i = 0; i < 4; i++) {
          bits |= readByte() << (i * 8);
        }
      }
      float* result = reinterpret_cast<float*>(&bits);
      if (!result) {
        std::cerr << "read float empty." << std::endl;
      }
      return static_cast<FloatType>(*result);
    }
  };

  template <TypeKind columnKind, bool isLittleEndian, typename ValueType, typename BatchType>
  DoubleColumnReader<columnKind, isLittleEndian, ValueType, BatchType>::DoubleColumnReader(
      const Type& type, StripeStreams& stripe)
      : ColumnReader(type, stripe), bufferPointer_(nullptr), bufferEnd_(nullptr) {
    inputStream_ = stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
    if (inputStream_ == nullptr) throw ParseError("DATA stream not found in Double column");
  }

  template <TypeKind columnKind, bool isLittleEndian, typename ValueType, typename BatchType>
  uint64_t DoubleColumnReader<columnKind, isLittleEndian, ValueType, BatchType>::skip(
      uint64_t numValues) {
    numValues = ColumnReader::skip(numValues);

    if (static_cast<size_t>(bufferEnd_ - bufferPointer_) >= bytesPerValue_ * numValues) {
      bufferPointer_ += bytesPerValue_ * numValues;
    } else {
      size_t sizeToSkip =
          bytesPerValue_ * numValues - static_cast<size_t>(bufferEnd_ - bufferPointer_);
      const size_t cap = static_cast<size_t>(std::numeric_limits<int>::max());
      while (sizeToSkip != 0) {
        size_t step = sizeToSkip > cap ? cap : sizeToSkip;
        inputStream_->Skip(static_cast<int>(step));
        sizeToSkip -= step;
      }
      bufferEnd_ = nullptr;
      bufferPointer_ = nullptr;
    }

    return numValues;
  }

  template <TypeKind columnKind, bool isLittleEndian, typename ValueType, typename BatchType>
  void DoubleColumnReader<columnKind, isLittleEndian, ValueType, BatchType>::next(
      ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) {
    ColumnReader::next(rowBatch, numValues, notNull);
    // update the notNull from the parent class
    notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
    ValueType* outArray =
        reinterpret_cast<ValueType*>(dynamic_cast<BatchType&>(rowBatch).data.data());

    if constexpr (columnKind == FLOAT) {
      if (notNull) {
        for (size_t i = 0; i < numValues; ++i) {
          if (notNull[i]) {
            outArray[i] = readFloat<ValueType>();
          }
        }
      } else {
        for (size_t i = 0; i < numValues; ++i) {
          outArray[i] = readFloat<ValueType>();
        }
      }
    } else {
      if (notNull) {
        for (size_t i = 0; i < numValues; ++i) {
          if (notNull[i]) {
            outArray[i] = readDouble<ValueType>();
          }
        }
      } else {
        // Number of values in the buffer that we can copy directly.
        // Only viable when the machine is little-endian.
        uint64_t bufferNum = 0;
        if (isLittleEndian) {
          bufferNum = std::min(numValues,
                               static_cast<size_t>(bufferEnd_ - bufferPointer_) / bytesPerValue_);
          uint64_t bufferBytes = bufferNum * bytesPerValue_;
          if (bufferBytes > 0) {
            memcpy(outArray, bufferPointer_, bufferBytes);
            bufferPointer_ += bufferBytes;
          }
        }
        for (size_t i = bufferNum; i < numValues; ++i) {
          outArray[i] = readDouble<ValueType>();
        }
      }
    }
  }

  template <TypeKind columnKind, bool isLittleEndian, typename ValueType, typename BatchType>
  void DoubleColumnReader<columnKind, isLittleEndian, ValueType, BatchType>::seekToRowGroup(
      std::unordered_map<uint64_t, PositionProvider>& positions) {
    ColumnReader::seekToRowGroup(positions);
    inputStream_->seek(positions.at(columnId));
    // clear buffer state after seek
    bufferEnd_ = nullptr;
    bufferPointer_ = nullptr;
  }

  void readFully(char* buffer, int64_t bufferSize, SeekableInputStream* stream) {
    int64_t posn = 0;
    while (posn < bufferSize) {
      const void* chunk;
      int length;
      if (!stream->Next(&chunk, &length)) {
        throw ParseError("bad read in readFully");
      }
      if (posn + length > bufferSize) {
        throw ParseError("Corrupt dictionary blob in StringDictionaryColumn");
      }
      memcpy(buffer + posn, chunk, static_cast<size_t>(length));
      posn += length;
    }
  }

  class StringDictionaryColumnReader : public ColumnReader {
   private:
    std::shared_ptr<StringDictionary> dictionary_;
    std::unique_ptr<RleDecoder> rle_;

   public:
    StringDictionaryColumnReader(const Type& type, StripeStreams& stipe);
    ~StringDictionaryColumnReader() override;

    uint64_t skip(uint64_t numValues) override;

    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override;

    void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override;

    void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions) override;
  };

  StringDictionaryColumnReader::StringDictionaryColumnReader(const Type& type,
                                                             StripeStreams& stripe)
      : ColumnReader(type, stripe), dictionary_(new StringDictionary(stripe.getMemoryPool())) {
    RleVersion rleVersion = convertRleVersion(stripe.getEncoding(columnId).kind());
    uint32_t dictSize = stripe.getEncoding(columnId).dictionary_size();
    std::unique_ptr<SeekableInputStream> stream =
        stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
    if (stream == nullptr) {
      throw ParseError("DATA stream not found in StringDictionaryColumn");
    }
    rle_ = createRleDecoder(std::move(stream), false, rleVersion, memoryPool, metrics);
    stream = stripe.getStream(columnId, proto::Stream_Kind_LENGTH, false);
    if (dictSize > 0 && stream == nullptr) {
      throw ParseError("LENGTH stream not found in StringDictionaryColumn");
    }
    std::unique_ptr<RleDecoder> lengthDecoder =
        createRleDecoder(std::move(stream), false, rleVersion, memoryPool, metrics);
    dictionary_->dictionaryOffset.resize(dictSize + 1);
    int64_t* lengthArray = dictionary_->dictionaryOffset.data();
    lengthDecoder->next(lengthArray + 1, dictSize, nullptr);
    lengthArray[0] = 0;
    for (uint32_t i = 1; i < dictSize + 1; ++i) {
      if (lengthArray[i] < 0) {
        throw ParseError("Negative dictionary entry length");
      }
      lengthArray[i] += lengthArray[i - 1];
    }
    int64_t blobSize = lengthArray[dictSize];
    dictionary_->dictionaryBlob.resize(static_cast<uint64_t>(blobSize));
    std::unique_ptr<SeekableInputStream> blobStream =
        stripe.getStream(columnId, proto::Stream_Kind_DICTIONARY_DATA, false);
    if (blobSize > 0 && blobStream == nullptr) {
      throw ParseError("DICTIONARY_DATA stream not found in StringDictionaryColumn");
    }
    readFully(dictionary_->dictionaryBlob.data(), blobSize, blobStream.get());
  }

  StringDictionaryColumnReader::~StringDictionaryColumnReader() {
    // PASS
  }

  uint64_t StringDictionaryColumnReader::skip(uint64_t numValues) {
    numValues = ColumnReader::skip(numValues);
    rle_->skip(numValues);
    return numValues;
  }

  void StringDictionaryColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues,
                                          char* notNull) {
    ColumnReader::next(rowBatch, numValues, notNull);
    // update the notNull from the parent class
    notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
    StringVectorBatch& byteBatch = dynamic_cast<StringVectorBatch&>(rowBatch);
    char* blob = dictionary_->dictionaryBlob.data();
    int64_t* dictionaryOffsets = dictionary_->dictionaryOffset.data();
    char** outputStarts = byteBatch.data.data();
    int64_t* outputLengths = byteBatch.length.data();
    rle_->next(outputLengths, numValues, notNull);
    uint64_t dictionaryCount = dictionary_->dictionaryOffset.size() - 1;
    if (notNull) {
      for (uint64_t i = 0; i < numValues; ++i) {
        if (notNull[i]) {
          int64_t entry = outputLengths[i];
          if (entry < 0 || static_cast<uint64_t>(entry) >= dictionaryCount) {
            throw ParseError("Entry index out of range in StringDictionaryColumn");
          }
          outputStarts[i] = blob + dictionaryOffsets[entry];
          outputLengths[i] = dictionaryOffsets[entry + 1] - dictionaryOffsets[entry];
        }
      }
    } else {
      for (uint64_t i = 0; i < numValues; ++i) {
        int64_t entry = outputLengths[i];
        if (entry < 0 || static_cast<uint64_t>(entry) >= dictionaryCount) {
          throw ParseError("Entry index out of range in StringDictionaryColumn");
        }
        outputStarts[i] = blob + dictionaryOffsets[entry];
        outputLengths[i] = dictionaryOffsets[entry + 1] - dictionaryOffsets[entry];
      }
    }
  }

  void StringDictionaryColumnReader::nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues,
                                                 char* notNull) {
    ColumnReader::next(rowBatch, numValues, notNull);
    notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
    rowBatch.isEncoded = true;

    EncodedStringVectorBatch& batch = dynamic_cast<EncodedStringVectorBatch&>(rowBatch);
    batch.dictionary = this->dictionary_;

    // Length buffer is reused to save dictionary entry ids
    rle_->next(batch.index.data(), numValues, notNull);
  }

  void StringDictionaryColumnReader::seekToRowGroup(
      std::unordered_map<uint64_t, PositionProvider>& positions) {
    ColumnReader::seekToRowGroup(positions);
    rle_->seek(positions.at(columnId));
  }

  class StringDirectColumnReader : public ColumnReader {
   private:
    std::unique_ptr<RleDecoder> lengthRle_;
    std::unique_ptr<SeekableInputStream> blobStream_;
    const char* lastBuffer_;
    size_t lastBufferLength_;

    /**
     * Compute the total length of the values.
     * @param lengths the array of lengths
     * @param notNull the array of notNull flags
     * @param numValues the lengths of the arrays
     * @return the total number of bytes for the non-null values
     */
    size_t computeSize(const int64_t* lengths, const char* notNull, uint64_t numValues);

   public:
    StringDirectColumnReader(const Type& type, StripeStreams& stipe);
    ~StringDirectColumnReader() override;

    uint64_t skip(uint64_t numValues) override;

    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override;

    void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions) override;
  };

  StringDirectColumnReader::StringDirectColumnReader(const Type& type, StripeStreams& stripe)
      : ColumnReader(type, stripe) {
    RleVersion rleVersion = convertRleVersion(stripe.getEncoding(columnId).kind());
    std::unique_ptr<SeekableInputStream> stream =
        stripe.getStream(columnId, proto::Stream_Kind_LENGTH, true);
    if (stream == nullptr) throw ParseError("LENGTH stream not found in StringDirectColumn");
    lengthRle_ = createRleDecoder(std::move(stream), false, rleVersion, memoryPool, metrics);
    blobStream_ = stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
    if (blobStream_ == nullptr) throw ParseError("DATA stream not found in StringDirectColumn");
    lastBuffer_ = nullptr;
    lastBufferLength_ = 0;
  }

  StringDirectColumnReader::~StringDirectColumnReader() {
    // PASS
  }

  uint64_t StringDirectColumnReader::skip(uint64_t numValues) {
    const size_t BUFFER_SIZE = 1024;
    numValues = ColumnReader::skip(numValues);
    int64_t buffer[BUFFER_SIZE];
    uint64_t done = 0;
    size_t totalBytes = 0;
    // read the lengths, so we know haw many bytes to skip
    while (done < numValues) {
      uint64_t step = std::min(BUFFER_SIZE, static_cast<size_t>(numValues - done));
      lengthRle_->next(buffer, step, nullptr);
      totalBytes += computeSize(buffer, nullptr, step);
      done += step;
    }
    if (totalBytes <= lastBufferLength_) {
      // subtract the needed bytes from the ones left over
      lastBufferLength_ -= totalBytes;
      lastBuffer_ += totalBytes;
    } else {
      // move the stream forward after accounting for the buffered bytes
      totalBytes -= lastBufferLength_;
      const size_t cap = static_cast<size_t>(std::numeric_limits<int>::max());
      while (totalBytes != 0) {
        size_t step = totalBytes > cap ? cap : totalBytes;
        blobStream_->Skip(static_cast<int>(step));
        totalBytes -= step;
      }
      lastBufferLength_ = 0;
      lastBuffer_ = nullptr;
    }
    return numValues;
  }

  size_t StringDirectColumnReader::computeSize(const int64_t* lengths, const char* notNull,
                                               uint64_t numValues) {
    size_t totalLength = 0;
    if (notNull) {
      for (size_t i = 0; i < numValues; ++i) {
        if (notNull[i]) {
          totalLength += static_cast<size_t>(lengths[i]);
        }
      }
    } else {
      for (size_t i = 0; i < numValues; ++i) {
        totalLength += static_cast<size_t>(lengths[i]);
      }
    }
    return totalLength;
  }

  void StringDirectColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues,
                                      char* notNull) {
    ColumnReader::next(rowBatch, numValues, notNull);
    // update the notNull from the parent class
    notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
    StringVectorBatch& byteBatch = dynamic_cast<StringVectorBatch&>(rowBatch);
    char** startPtr = byteBatch.data.data();
    int64_t* lengthPtr = byteBatch.length.data();

    // read the length vector
    lengthRle_->next(lengthPtr, numValues, notNull);

    // figure out the total length of data we need from the blob stream
    const size_t totalLength = computeSize(lengthPtr, notNull, numValues);

    // Load data from the blob stream into our buffer until we have enough
    // to get the rest directly out of the stream's buffer.
    size_t bytesBuffered = 0;
    byteBatch.blob.resize(totalLength);
    char* ptr = byteBatch.blob.data();
    while (bytesBuffered + lastBufferLength_ < totalLength) {
      memcpy(ptr + bytesBuffered, lastBuffer_, lastBufferLength_);
      bytesBuffered += lastBufferLength_;
      const void* readBuffer;
      int readLength;
      if (!blobStream_->Next(&readBuffer, &readLength)) {
        throw ParseError("failed to read in StringDirectColumnReader.next");
      }
      lastBuffer_ = static_cast<const char*>(readBuffer);
      lastBufferLength_ = static_cast<size_t>(readLength);
    }

    if (bytesBuffered < totalLength) {
      size_t moreBytes = totalLength - bytesBuffered;
      memcpy(ptr + bytesBuffered, lastBuffer_, moreBytes);
      lastBuffer_ += moreBytes;
      lastBufferLength_ -= moreBytes;
    }

    size_t filledSlots = 0;
    ptr = byteBatch.blob.data();
    if (notNull) {
      while (filledSlots < numValues) {
        if (notNull[filledSlots]) {
          startPtr[filledSlots] = const_cast<char*>(ptr);
          ptr += lengthPtr[filledSlots];
        }
        filledSlots += 1;
      }
    } else {
      while (filledSlots < numValues) {
        startPtr[filledSlots] = const_cast<char*>(ptr);
        ptr += lengthPtr[filledSlots];
        filledSlots += 1;
      }
    }
  }

  void StringDirectColumnReader::seekToRowGroup(
      std::unordered_map<uint64_t, PositionProvider>& positions) {
    ColumnReader::seekToRowGroup(positions);
    blobStream_->seek(positions.at(columnId));
    lengthRle_->seek(positions.at(columnId));
    // clear buffer state after seek
    lastBuffer_ = nullptr;
    lastBufferLength_ = 0;
  }

  class StructColumnReader : public ColumnReader {
   private:
    std::vector<std::unique_ptr<ColumnReader>> children_;

   public:
    StructColumnReader(const Type& type, StripeStreams& stripe, bool useTightNumericVector = false,
                       bool throwOnSchemaEvolutionOverflow = false);

    uint64_t skip(uint64_t numValues) override;

    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override;

    void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override;

    void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions) override;

   private:
    template <bool encoded>
    void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull);
  };

  StructColumnReader::StructColumnReader(const Type& type, StripeStreams& stripe,
                                         bool useTightNumericVector,
                                         bool throwOnSchemaEvolutionOverflow)
      : ColumnReader(type, stripe) {
    // count the number of selected sub-columns
    const std::vector<bool> selectedColumns = stripe.getSelectedColumns();
    switch (static_cast<int64_t>(stripe.getEncoding(columnId).kind())) {
      case proto::ColumnEncoding_Kind_DIRECT:
        for (unsigned int i = 0; i < type.getSubtypeCount(); ++i) {
          const Type& child = *type.getSubtype(i);
          if (selectedColumns[static_cast<uint64_t>(child.getColumnId())]) {
            children_.push_back(
                buildReader(child, stripe, useTightNumericVector, throwOnSchemaEvolutionOverflow));
          }
        }
        break;
      case proto::ColumnEncoding_Kind_DIRECT_V2:
      case proto::ColumnEncoding_Kind_DICTIONARY:
      case proto::ColumnEncoding_Kind_DICTIONARY_V2:
      default:
        throw ParseError("Unknown encoding for StructColumnReader");
    }
  }

  uint64_t StructColumnReader::skip(uint64_t numValues) {
    numValues = ColumnReader::skip(numValues);
    for (auto& ptr : children_) {
      ptr->skip(numValues);
    }
    return numValues;
  }

  void StructColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) {
    nextInternal<false>(rowBatch, numValues, notNull);
  }

  void StructColumnReader::nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues,
                                       char* notNull) {
    nextInternal<true>(rowBatch, numValues, notNull);
  }

  template <bool encoded>
  void StructColumnReader::nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues,
                                        char* notNull) {
    ColumnReader::next(rowBatch, numValues, notNull);
    uint64_t i = 0;
    notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
    for (auto iter = children_.begin(); iter != children_.end(); ++iter, ++i) {
      if (encoded) {
        (*iter)->nextEncoded(*(dynamic_cast<StructVectorBatch&>(rowBatch).fields[i]), numValues,
                             notNull);
      } else {
        (*iter)->next(*(dynamic_cast<StructVectorBatch&>(rowBatch).fields[i]), numValues, notNull);
      }
    }
  }

  void StructColumnReader::seekToRowGroup(
      std::unordered_map<uint64_t, PositionProvider>& positions) {
    ColumnReader::seekToRowGroup(positions);

    for (auto& ptr : children_) {
      ptr->seekToRowGroup(positions);
    }
  }

  class ListColumnReader : public ColumnReader {
   private:
    std::unique_ptr<ColumnReader> child_;
    std::unique_ptr<RleDecoder> rle_;

   public:
    ListColumnReader(const Type& type, StripeStreams& stipe, bool useTightNumericVector = false,
                     bool throwOnSchemaEvolutionOverflow = false);
    ~ListColumnReader() override;

    uint64_t skip(uint64_t numValues) override;

    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override;

    void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override;

    void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions) override;

   private:
    template <bool encoded>
    void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull);
  };

  ListColumnReader::ListColumnReader(const Type& type, StripeStreams& stripe,
                                     bool useTightNumericVector,
                                     bool throwOnSchemaEvolutionOverflow)
      : ColumnReader(type, stripe) {
    // count the number of selected sub-columns
    const std::vector<bool> selectedColumns = stripe.getSelectedColumns();
    RleVersion vers = convertRleVersion(stripe.getEncoding(columnId).kind());
    std::unique_ptr<SeekableInputStream> stream =
        stripe.getStream(columnId, proto::Stream_Kind_LENGTH, true);
    if (stream == nullptr) throw ParseError("LENGTH stream not found in List column");
    rle_ = createRleDecoder(std::move(stream), false, vers, memoryPool, metrics);
    const Type& childType = *type.getSubtype(0);
    if (selectedColumns[static_cast<uint64_t>(childType.getColumnId())]) {
      child_ =
          buildReader(childType, stripe, useTightNumericVector, throwOnSchemaEvolutionOverflow);
    }
  }

  ListColumnReader::~ListColumnReader() {
    // PASS
  }

  uint64_t ListColumnReader::skip(uint64_t numValues) {
    numValues = ColumnReader::skip(numValues);
    ColumnReader* childReader = child_.get();
    if (childReader) {
      const uint64_t BUFFER_SIZE = 1024;
      int64_t buffer[BUFFER_SIZE];
      uint64_t childrenElements = 0;
      uint64_t lengthsRead = 0;
      while (lengthsRead < numValues) {
        uint64_t chunk = std::min(numValues - lengthsRead, BUFFER_SIZE);
        rle_->next(buffer, chunk, nullptr);
        for (size_t i = 0; i < chunk; ++i) {
          childrenElements += static_cast<size_t>(buffer[i]);
        }
        lengthsRead += chunk;
      }
      childReader->skip(childrenElements);
    } else {
      rle_->skip(numValues);
    }
    return numValues;
  }

  void ListColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) {
    nextInternal<false>(rowBatch, numValues, notNull);
  }

  void ListColumnReader::nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues,
                                     char* notNull) {
    nextInternal<true>(rowBatch, numValues, notNull);
  }

  template <bool encoded>
  void ListColumnReader::nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues,
                                      char* notNull) {
    ColumnReader::next(rowBatch, numValues, notNull);
    ListVectorBatch& listBatch = dynamic_cast<ListVectorBatch&>(rowBatch);
    int64_t* offsets = listBatch.offsets.data();
    notNull = listBatch.hasNulls ? listBatch.notNull.data() : nullptr;
    rle_->next(offsets, numValues, notNull);
    uint64_t totalChildren = 0;
    if (notNull) {
      for (size_t i = 0; i < numValues; ++i) {
        if (notNull[i]) {
          uint64_t tmp = static_cast<uint64_t>(offsets[i]);
          offsets[i] = static_cast<int64_t>(totalChildren);
          totalChildren += tmp;
        } else {
          offsets[i] = static_cast<int64_t>(totalChildren);
        }
      }
    } else {
      for (size_t i = 0; i < numValues; ++i) {
        uint64_t tmp = static_cast<uint64_t>(offsets[i]);
        offsets[i] = static_cast<int64_t>(totalChildren);
        totalChildren += tmp;
      }
    }
    offsets[numValues] = static_cast<int64_t>(totalChildren);
    ColumnReader* childReader = child_.get();
    if (childReader) {
      if (encoded) {
        childReader->nextEncoded(*(listBatch.elements.get()), totalChildren, nullptr);
      } else {
        childReader->next(*(listBatch.elements.get()), totalChildren, nullptr);
      }
    }
  }

  void ListColumnReader::seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions) {
    ColumnReader::seekToRowGroup(positions);
    rle_->seek(positions.at(columnId));
    if (child_.get()) {
      child_->seekToRowGroup(positions);
    }
  }

  class MapColumnReader : public ColumnReader {
   private:
    std::unique_ptr<ColumnReader> keyReader_;
    std::unique_ptr<ColumnReader> elementReader_;
    std::unique_ptr<RleDecoder> rle_;

   public:
    MapColumnReader(const Type& type, StripeStreams& stipe, bool useTightNumericVector = false,
                    bool throwOnSchemaEvolutionOverflow = false);
    ~MapColumnReader() override;

    uint64_t skip(uint64_t numValues) override;

    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override;

    void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override;

    void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions) override;

   private:
    template <bool encoded>
    void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull);
  };

  MapColumnReader::MapColumnReader(const Type& type, StripeStreams& stripe,
                                   bool useTightNumericVector, bool throwOnSchemaEvolutionOverflow)
      : ColumnReader(type, stripe) {
    // Determine if the key and/or value columns are selected
    const std::vector<bool> selectedColumns = stripe.getSelectedColumns();
    RleVersion vers = convertRleVersion(stripe.getEncoding(columnId).kind());
    std::unique_ptr<SeekableInputStream> stream =
        stripe.getStream(columnId, proto::Stream_Kind_LENGTH, true);
    if (stream == nullptr) throw ParseError("LENGTH stream not found in Map column");
    rle_ = createRleDecoder(std::move(stream), false, vers, memoryPool, metrics);
    const Type& keyType = *type.getSubtype(0);
    if (selectedColumns[static_cast<uint64_t>(keyType.getColumnId())]) {
      keyReader_ =
          buildReader(keyType, stripe, useTightNumericVector, throwOnSchemaEvolutionOverflow);
    }
    const Type& elementType = *type.getSubtype(1);
    if (selectedColumns[static_cast<uint64_t>(elementType.getColumnId())]) {
      elementReader_ =
          buildReader(elementType, stripe, useTightNumericVector, throwOnSchemaEvolutionOverflow);
    }
  }

  MapColumnReader::~MapColumnReader() {
    // PASS
  }

  uint64_t MapColumnReader::skip(uint64_t numValues) {
    numValues = ColumnReader::skip(numValues);
    ColumnReader* rawKeyReader = keyReader_.get();
    ColumnReader* rawElementReader = elementReader_.get();
    if (rawKeyReader || rawElementReader) {
      const uint64_t BUFFER_SIZE = 1024;
      int64_t buffer[BUFFER_SIZE];
      uint64_t childrenElements = 0;
      uint64_t lengthsRead = 0;
      while (lengthsRead < numValues) {
        uint64_t chunk = std::min(numValues - lengthsRead, BUFFER_SIZE);
        rle_->next(buffer, chunk, nullptr);
        for (size_t i = 0; i < chunk; ++i) {
          childrenElements += static_cast<size_t>(buffer[i]);
        }
        lengthsRead += chunk;
      }
      if (rawKeyReader) {
        rawKeyReader->skip(childrenElements);
      }
      if (rawElementReader) {
        rawElementReader->skip(childrenElements);
      }
    } else {
      rle_->skip(numValues);
    }
    return numValues;
  }

  void MapColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) {
    nextInternal<false>(rowBatch, numValues, notNull);
  }

  void MapColumnReader::nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues,
                                    char* notNull) {
    nextInternal<true>(rowBatch, numValues, notNull);
  }

  template <bool encoded>
  void MapColumnReader::nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues,
                                     char* notNull) {
    ColumnReader::next(rowBatch, numValues, notNull);
    MapVectorBatch& mapBatch = dynamic_cast<MapVectorBatch&>(rowBatch);
    int64_t* offsets = mapBatch.offsets.data();
    notNull = mapBatch.hasNulls ? mapBatch.notNull.data() : nullptr;
    rle_->next(offsets, numValues, notNull);
    uint64_t totalChildren = 0;
    if (notNull) {
      for (size_t i = 0; i < numValues; ++i) {
        if (notNull[i]) {
          uint64_t tmp = static_cast<uint64_t>(offsets[i]);
          offsets[i] = static_cast<int64_t>(totalChildren);
          totalChildren += tmp;
        } else {
          offsets[i] = static_cast<int64_t>(totalChildren);
        }
      }
    } else {
      for (size_t i = 0; i < numValues; ++i) {
        uint64_t tmp = static_cast<uint64_t>(offsets[i]);
        offsets[i] = static_cast<int64_t>(totalChildren);
        totalChildren += tmp;
      }
    }
    offsets[numValues] = static_cast<int64_t>(totalChildren);
    ColumnReader* rawKeyReader = keyReader_.get();
    if (rawKeyReader) {
      if (encoded) {
        rawKeyReader->nextEncoded(*(mapBatch.keys.get()), totalChildren, nullptr);
      } else {
        rawKeyReader->next(*(mapBatch.keys.get()), totalChildren, nullptr);
      }
    }
    ColumnReader* rawElementReader = elementReader_.get();
    if (rawElementReader) {
      if (encoded) {
        rawElementReader->nextEncoded(*(mapBatch.elements.get()), totalChildren, nullptr);
      } else {
        rawElementReader->next(*(mapBatch.elements.get()), totalChildren, nullptr);
      }
    }
  }

  void MapColumnReader::seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions) {
    ColumnReader::seekToRowGroup(positions);
    rle_->seek(positions.at(columnId));
    if (keyReader_.get()) {
      keyReader_->seekToRowGroup(positions);
    }
    if (elementReader_.get()) {
      elementReader_->seekToRowGroup(positions);
    }
  }

  class UnionColumnReader : public ColumnReader {
   private:
    std::unique_ptr<ByteRleDecoder> rle_;
    std::vector<std::unique_ptr<ColumnReader>> childrenReader_;
    std::vector<int64_t> childrenCounts_;
    uint64_t numChildren_;

   public:
    UnionColumnReader(const Type& type, StripeStreams& stipe, bool useTightNumericVector = false,
                      bool throwOnSchemaEvolutionOverflow = false);

    uint64_t skip(uint64_t numValues) override;

    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override;

    void nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override;

    void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions) override;

   private:
    template <bool encoded>
    void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull);
  };

  UnionColumnReader::UnionColumnReader(const Type& type, StripeStreams& stripe,
                                       bool useTightNumericVector,
                                       bool throwOnSchemaEvolutionOverflow)
      : ColumnReader(type, stripe) {
    numChildren_ = type.getSubtypeCount();
    childrenReader_.resize(numChildren_);
    childrenCounts_.resize(numChildren_);

    std::unique_ptr<SeekableInputStream> stream =
        stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
    if (stream == nullptr) throw ParseError("LENGTH stream not found in Union column");
    rle_ = createByteRleDecoder(std::move(stream), metrics);
    // figure out which types are selected
    const std::vector<bool> selectedColumns = stripe.getSelectedColumns();
    for (unsigned int i = 0; i < numChildren_; ++i) {
      const Type& child = *type.getSubtype(i);
      if (selectedColumns[static_cast<size_t>(child.getColumnId())]) {
        childrenReader_[i] =
            buildReader(child, stripe, useTightNumericVector, throwOnSchemaEvolutionOverflow);
      }
    }
  }

  uint64_t UnionColumnReader::skip(uint64_t numValues) {
    numValues = ColumnReader::skip(numValues);
    const uint64_t BUFFER_SIZE = 1024;
    char buffer[BUFFER_SIZE];
    uint64_t lengthsRead = 0;
    int64_t* counts = childrenCounts_.data();
    memset(counts, 0, sizeof(int64_t) * numChildren_);
    while (lengthsRead < numValues) {
      uint64_t chunk = std::min(numValues - lengthsRead, BUFFER_SIZE);
      rle_->next(buffer, chunk, nullptr);
      for (size_t i = 0; i < chunk; ++i) {
        counts[static_cast<size_t>(buffer[i])] += 1;
      }
      lengthsRead += chunk;
    }
    for (size_t i = 0; i < numChildren_; ++i) {
      if (counts[i] != 0 && childrenReader_[i] != nullptr) {
        childrenReader_[i]->skip(static_cast<uint64_t>(counts[i]));
      }
    }
    return numValues;
  }

  void UnionColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) {
    nextInternal<false>(rowBatch, numValues, notNull);
  }

  void UnionColumnReader::nextEncoded(ColumnVectorBatch& rowBatch, uint64_t numValues,
                                      char* notNull) {
    nextInternal<true>(rowBatch, numValues, notNull);
  }

  template <bool encoded>
  void UnionColumnReader::nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues,
                                       char* notNull) {
    ColumnReader::next(rowBatch, numValues, notNull);
    UnionVectorBatch& unionBatch = dynamic_cast<UnionVectorBatch&>(rowBatch);
    uint64_t* offsets = unionBatch.offsets.data();
    int64_t* counts = childrenCounts_.data();
    memset(counts, 0, sizeof(int64_t) * numChildren_);
    unsigned char* tags = unionBatch.tags.data();
    notNull = unionBatch.hasNulls ? unionBatch.notNull.data() : nullptr;
    rle_->next(reinterpret_cast<char*>(tags), numValues, notNull);
    // set the offsets for each row
    if (notNull) {
      for (size_t i = 0; i < numValues; ++i) {
        if (notNull[i]) {
          offsets[i] = static_cast<uint64_t>(counts[static_cast<size_t>(tags[i])]++);
        }
      }
    } else {
      for (size_t i = 0; i < numValues; ++i) {
        offsets[i] = static_cast<uint64_t>(counts[static_cast<size_t>(tags[i])]++);
      }
    }
    // read the right number of each child column
    for (size_t i = 0; i < numChildren_; ++i) {
      if (childrenReader_[i] != nullptr) {
        if (encoded) {
          childrenReader_[i]->nextEncoded(*(unionBatch.children[i]),
                                          static_cast<uint64_t>(counts[i]), nullptr);
        } else {
          childrenReader_[i]->next(*(unionBatch.children[i]), static_cast<uint64_t>(counts[i]),
                                   nullptr);
        }
      }
    }
  }

  void UnionColumnReader::seekToRowGroup(
      std::unordered_map<uint64_t, PositionProvider>& positions) {
    ColumnReader::seekToRowGroup(positions);
    rle_->seek(positions.at(columnId));
    for (size_t i = 0; i < numChildren_; ++i) {
      if (childrenReader_[i] != nullptr) {
        childrenReader_[i]->seekToRowGroup(positions);
      }
    }
  }

  /**
   * Destructively convert the number from zigzag encoding to the
   * natural signed representation.
   */
  void unZigZagInt128(Int128& value) {
    bool needsNegate = value.getLowBits() & 1;
    value >>= 1;
    if (needsNegate) {
      value.negate();
      value -= 1;
    }
  }

  class Decimal64ColumnReader : public ColumnReader {
   public:
    static const uint32_t MAX_PRECISION_64 = 18;
    static const uint32_t MAX_PRECISION_128 = 38;
    static const int64_t POWERS_OF_TEN[MAX_PRECISION_64 + 1];

   protected:
    std::unique_ptr<SeekableInputStream> valueStream;
    int32_t precision;
    int32_t scale;
    const char* buffer;
    const char* bufferEnd;

    std::unique_ptr<RleDecoder> scaleDecoder;

    /**
     * Read the valueStream for more bytes.
     */
    void readBuffer() {
      while (buffer == bufferEnd) {
        int length;
        if (!valueStream->Next(reinterpret_cast<const void**>(&buffer), &length)) {
          throw ParseError("Read past end of stream in Decimal64ColumnReader " +
                           valueStream->getName());
        }
        bufferEnd = buffer + length;
      }
    }

    void readInt64(int64_t& value, int32_t currentScale) {
      value = 0;
      size_t offset = 0;
      while (true) {
        readBuffer();
        unsigned char ch = static_cast<unsigned char>(*(buffer++));
        value |= static_cast<uint64_t>(ch & 0x7f) << offset;
        offset += 7;
        if (!(ch & 0x80)) {
          break;
        }
      }
      value = unZigZag(static_cast<uint64_t>(value));
      if (scale > currentScale && static_cast<uint64_t>(scale - currentScale) <= MAX_PRECISION_64) {
        value *= POWERS_OF_TEN[scale - currentScale];
      } else if (scale < currentScale &&
                 static_cast<uint64_t>(currentScale - scale) <= MAX_PRECISION_64) {
        value /= POWERS_OF_TEN[currentScale - scale];
      } else if (scale != currentScale) {
        throw ParseError("Decimal scale out of range");
      }
    }

   public:
    Decimal64ColumnReader(const Type& type, StripeStreams& stipe);
    ~Decimal64ColumnReader() override;

    uint64_t skip(uint64_t numValues) override;

    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override;

    void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions) override;
  };
  const uint32_t Decimal64ColumnReader::MAX_PRECISION_64;
  const uint32_t Decimal64ColumnReader::MAX_PRECISION_128;
  const int64_t Decimal64ColumnReader::POWERS_OF_TEN[MAX_PRECISION_64 + 1] = {1,
                                                                              10,
                                                                              100,
                                                                              1000,
                                                                              10000,
                                                                              100000,
                                                                              1000000,
                                                                              10000000,
                                                                              100000000,
                                                                              1000000000,
                                                                              10000000000,
                                                                              100000000000,
                                                                              1000000000000,
                                                                              10000000000000,
                                                                              100000000000000,
                                                                              1000000000000000,
                                                                              10000000000000000,
                                                                              100000000000000000,
                                                                              1000000000000000000};

  Decimal64ColumnReader::Decimal64ColumnReader(const Type& type, StripeStreams& stripe)
      : ColumnReader(type, stripe) {
    scale = static_cast<int32_t>(type.getScale());
    precision = static_cast<int32_t>(type.getPrecision());
    valueStream = stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
    if (valueStream == nullptr) throw ParseError("DATA stream not found in Decimal64Column");
    buffer = nullptr;
    bufferEnd = nullptr;
    RleVersion vers = convertRleVersion(stripe.getEncoding(columnId).kind());
    std::unique_ptr<SeekableInputStream> stream =
        stripe.getStream(columnId, proto::Stream_Kind_SECONDARY, true);
    if (stream == nullptr) throw ParseError("SECONDARY stream not found in Decimal64Column");
    scaleDecoder = createRleDecoder(std::move(stream), true, vers, memoryPool, metrics);
  }

  Decimal64ColumnReader::~Decimal64ColumnReader() {
    // PASS
  }

  uint64_t Decimal64ColumnReader::skip(uint64_t numValues) {
    numValues = ColumnReader::skip(numValues);
    uint64_t skipped = 0;
    while (skipped < numValues) {
      readBuffer();
      if (!(0x80 & *(buffer++))) {
        skipped += 1;
      }
    }
    scaleDecoder->skip(numValues);
    return numValues;
  }

  void Decimal64ColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) {
    ColumnReader::next(rowBatch, numValues, notNull);
    notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
    Decimal64VectorBatch& batch = dynamic_cast<Decimal64VectorBatch&>(rowBatch);
    int64_t* values = batch.values.data();
    // read the next group of scales
    int64_t* scaleBuffer = batch.readScales.data();
    scaleDecoder->next(scaleBuffer, numValues, notNull);
    batch.precision = precision;
    batch.scale = scale;
    if (notNull) {
      for (size_t i = 0; i < numValues; ++i) {
        if (notNull[i]) {
          readInt64(values[i], static_cast<int32_t>(scaleBuffer[i]));
        }
      }
    } else {
      for (size_t i = 0; i < numValues; ++i) {
        readInt64(values[i], static_cast<int32_t>(scaleBuffer[i]));
      }
    }
  }

  void scaleInt128(Int128& value, uint32_t scale, uint32_t currentScale) {
    if (scale > currentScale) {
      while (scale > currentScale) {
        uint32_t scaleAdjust =
            std::min(Decimal64ColumnReader::MAX_PRECISION_64, scale - currentScale);
        value *= Decimal64ColumnReader::POWERS_OF_TEN[scaleAdjust];
        currentScale += scaleAdjust;
      }
    } else if (scale < currentScale) {
      Int128 remainder;
      while (currentScale > scale) {
        uint32_t scaleAdjust =
            std::min(Decimal64ColumnReader::MAX_PRECISION_64, currentScale - scale);
        value = value.divide(Decimal64ColumnReader::POWERS_OF_TEN[scaleAdjust], remainder);
        currentScale -= scaleAdjust;
      }
    }
  }

  void Decimal64ColumnReader::seekToRowGroup(
      std::unordered_map<uint64_t, PositionProvider>& positions) {
    ColumnReader::seekToRowGroup(positions);
    valueStream->seek(positions.at(columnId));
    scaleDecoder->seek(positions.at(columnId));
    // clear buffer state after seek
    buffer = nullptr;
    bufferEnd = nullptr;
  }

  class Decimal128ColumnReader : public Decimal64ColumnReader {
   public:
    Decimal128ColumnReader(const Type& type, StripeStreams& stipe);
    ~Decimal128ColumnReader() override;

    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override;

   private:
    void readInt128(Int128& value, int32_t currentScale) {
      value = 0;
      Int128 work;
      uint32_t offset = 0;
      while (true) {
        readBuffer();
        unsigned char ch = static_cast<unsigned char>(*(buffer++));
        work = ch & 0x7f;
        work <<= offset;
        value |= work;
        offset += 7;
        if (!(ch & 0x80)) {
          break;
        }
      }
      unZigZagInt128(value);
      scaleInt128(value, static_cast<uint32_t>(scale), static_cast<uint32_t>(currentScale));
    }
  };

  Decimal128ColumnReader::Decimal128ColumnReader(const Type& type, StripeStreams& stripe)
      : Decimal64ColumnReader(type, stripe) {
    // PASS
  }

  Decimal128ColumnReader::~Decimal128ColumnReader() {
    // PASS
  }

  void Decimal128ColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues,
                                    char* notNull) {
    ColumnReader::next(rowBatch, numValues, notNull);
    notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
    Decimal128VectorBatch& batch = dynamic_cast<Decimal128VectorBatch&>(rowBatch);
    Int128* values = batch.values.data();
    // read the next group of scales
    int64_t* scaleBuffer = batch.readScales.data();
    scaleDecoder->next(scaleBuffer, numValues, notNull);
    batch.precision = precision;
    batch.scale = scale;
    if (notNull) {
      for (size_t i = 0; i < numValues; ++i) {
        if (notNull[i]) {
          readInt128(values[i], static_cast<int32_t>(scaleBuffer[i]));
        }
      }
    } else {
      for (size_t i = 0; i < numValues; ++i) {
        readInt128(values[i], static_cast<int32_t>(scaleBuffer[i]));
      }
    }
  }

  class Decimal64ColumnReaderV2 : public ColumnReader {
   protected:
    std::unique_ptr<RleDecoder> valueDecoder;
    int32_t precision;
    int32_t scale;

   public:
    Decimal64ColumnReaderV2(const Type& type, StripeStreams& stripe);
    ~Decimal64ColumnReaderV2() override;

    uint64_t skip(uint64_t numValues) override;

    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override;
  };

  Decimal64ColumnReaderV2::Decimal64ColumnReaderV2(const Type& type, StripeStreams& stripe)
      : ColumnReader(type, stripe) {
    scale = static_cast<int32_t>(type.getScale());
    precision = static_cast<int32_t>(type.getPrecision());
    std::unique_ptr<SeekableInputStream> stream =
        stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
    if (stream == nullptr) {
      std::stringstream ss;
      ss << "DATA stream not found in Decimal64V2 column. ColumnId=" << columnId;
      throw ParseError(ss.str());
    }
    valueDecoder = createRleDecoder(std::move(stream), true, RleVersion_2, memoryPool, metrics);
  }

  Decimal64ColumnReaderV2::~Decimal64ColumnReaderV2() {
    // PASS
  }

  uint64_t Decimal64ColumnReaderV2::skip(uint64_t numValues) {
    numValues = ColumnReader::skip(numValues);
    valueDecoder->skip(numValues);
    return numValues;
  }

  void Decimal64ColumnReaderV2::next(ColumnVectorBatch& rowBatch, uint64_t numValues,
                                     char* notNull) {
    ColumnReader::next(rowBatch, numValues, notNull);
    notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
    Decimal64VectorBatch& batch = dynamic_cast<Decimal64VectorBatch&>(rowBatch);
    valueDecoder->next(batch.values.data(), numValues, notNull);
    batch.precision = precision;
    batch.scale = scale;
  }

  class DecimalHive11ColumnReader : public Decimal64ColumnReader {
   private:
    bool throwOnOverflow_;
    std::ostream* errorStream_;

    /**
     * Read an Int128 from the stream and correct it to the desired scale.
     */
    bool readInt128(Int128& value, int32_t currentScale) {
      // -/+ 99999999999999999999999999999999999999
      static const Int128 MIN_VALUE(-0x4b3b4ca85a86c47b, 0xf675ddc000000001);
      static const Int128 MAX_VALUE(0x4b3b4ca85a86c47a, 0x098a223fffffffff);

      value = 0;
      Int128 work;
      uint32_t offset = 0;
      bool result = true;
      while (true) {
        readBuffer();
        unsigned char ch = static_cast<unsigned char>(*(buffer++));
        work = ch & 0x7f;
        // If we have read more than 128 bits, we flag the error, but keep
        // reading bytes so the stream isn't thrown off.
        if (offset > 128 || (offset == 126 && work > 3)) {
          result = false;
        }
        work <<= offset;
        value |= work;
        offset += 7;
        if (!(ch & 0x80)) {
          break;
        }
      }

      if (!result) {
        return result;
      }
      unZigZagInt128(value);
      scaleInt128(value, static_cast<uint32_t>(scale), static_cast<uint32_t>(currentScale));
      return value >= MIN_VALUE && value <= MAX_VALUE;
    }

   public:
    DecimalHive11ColumnReader(const Type& type, StripeStreams& stipe);
    ~DecimalHive11ColumnReader() override;

    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override;
  };

  DecimalHive11ColumnReader::DecimalHive11ColumnReader(const Type& type, StripeStreams& stripe)
      : Decimal64ColumnReader(type, stripe) {
    scale = stripe.getForcedScaleOnHive11Decimal();
    throwOnOverflow_ = stripe.getThrowOnHive11DecimalOverflow();
    errorStream_ = stripe.getErrorStream();
  }

  DecimalHive11ColumnReader::~DecimalHive11ColumnReader() {
    // PASS
  }

  void DecimalHive11ColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues,
                                       char* notNull) {
    ColumnReader::next(rowBatch, numValues, notNull);
    notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
    Decimal128VectorBatch& batch = dynamic_cast<Decimal128VectorBatch&>(rowBatch);
    Int128* values = batch.values.data();
    // read the next group of scales
    int64_t* scaleBuffer = batch.readScales.data();

    scaleDecoder->next(scaleBuffer, numValues, notNull);

    batch.precision = precision;
    batch.scale = scale;
    if (notNull) {
      for (size_t i = 0; i < numValues; ++i) {
        if (notNull[i]) {
          if (!readInt128(values[i], static_cast<int32_t>(scaleBuffer[i]))) {
            if (throwOnOverflow_) {
              throw ParseError("Hive 0.11 decimal was more than 38 digits.");
            } else {
              *errorStream_ << "Warning: "
                            << "Hive 0.11 decimal with more than 38 digits "
                            << "replaced by NULL.\n";
              notNull[i] = false;
            }
          }
        }
      }
    } else {
      for (size_t i = 0; i < numValues; ++i) {
        if (!readInt128(values[i], static_cast<int32_t>(scaleBuffer[i]))) {
          if (throwOnOverflow_) {
            throw ParseError("Hive 0.11 decimal was more than 38 digits.");
          } else {
            *errorStream_ << "Warning: "
                          << "Hive 0.11 decimal with more than 38 digits "
                          << "replaced by NULL.\n";
            batch.hasNulls = true;
            batch.notNull[i] = false;
          }
        }
      }
    }
  }

  static bool isLittleEndian() {
    static union {
      uint32_t i;
      char c[4];
    } num = {0x01020304};
    return num.c[0] == 4;
  }

  /**
   * Create a reader for the given stripe.
   */
  std::unique_ptr<ColumnReader> buildReader(const Type& type, StripeStreams& stripe,
                                            bool useTightNumericVector,
                                            bool throwOnSchemaEvolutionOverflow,
                                            bool convertToReadType) {
    if (convertToReadType && stripe.getSchemaEvolution() &&
        stripe.getSchemaEvolution()->needConvert(type)) {
      return buildConvertReader(type, stripe, useTightNumericVector,
                                throwOnSchemaEvolutionOverflow);
    }

    switch (static_cast<int64_t>(type.getKind())) {
      case SHORT:
        if (useTightNumericVector) {
          return std::make_unique<IntegerColumnReader<ShortVectorBatch>>(type, stripe);
        }
        return std::make_unique<IntegerColumnReader<LongVectorBatch>>(type, stripe);
      case INT:
        if (useTightNumericVector) {
          return std::make_unique<IntegerColumnReader<IntVectorBatch>>(type, stripe);
        }
        return std::make_unique<IntegerColumnReader<LongVectorBatch>>(type, stripe);
      case LONG:
      case DATE:
        return std::make_unique<IntegerColumnReader<LongVectorBatch>>(type, stripe);
      case BINARY:
      case CHAR:
      case STRING:
      case VARCHAR:
        switch (static_cast<int64_t>(stripe.getEncoding(type.getColumnId()).kind())) {
          case proto::ColumnEncoding_Kind_DICTIONARY:
          case proto::ColumnEncoding_Kind_DICTIONARY_V2:
            return std::make_unique<StringDictionaryColumnReader>(type, stripe);
          case proto::ColumnEncoding_Kind_DIRECT:
          case proto::ColumnEncoding_Kind_DIRECT_V2:
            return std::make_unique<StringDirectColumnReader>(type, stripe);
          default:
            throw NotImplementedYet("buildReader unhandled string encoding");
        }

      case BOOLEAN: {
        if (useTightNumericVector) {
          return std::make_unique<BooleanColumnReader<ByteVectorBatch>>(type, stripe);
        } else {
          return std::make_unique<BooleanColumnReader<LongVectorBatch>>(type, stripe);
        }
      }

      case BYTE:
        if (useTightNumericVector) {
          return std::make_unique<ByteColumnReader<ByteVectorBatch>>(type, stripe);
        }
        return std::make_unique<ByteColumnReader<LongVectorBatch>>(type, stripe);

      case LIST:
        return std::make_unique<ListColumnReader>(type, stripe, useTightNumericVector,
                                                  throwOnSchemaEvolutionOverflow);

      case MAP:
        return std::make_unique<MapColumnReader>(type, stripe, useTightNumericVector,
                                                 throwOnSchemaEvolutionOverflow);

      case UNION:
        return std::make_unique<UnionColumnReader>(type, stripe, useTightNumericVector,
                                                   throwOnSchemaEvolutionOverflow);

      case STRUCT:
        return std::make_unique<StructColumnReader>(type, stripe, useTightNumericVector,
                                                    throwOnSchemaEvolutionOverflow);

      case FLOAT: {
        if (useTightNumericVector) {
          if (isLittleEndian()) {
            return std::make_unique<DoubleColumnReader<FLOAT, true, float, FloatVectorBatch>>(
                type, stripe);
          }
          return std::make_unique<DoubleColumnReader<FLOAT, false, float, FloatVectorBatch>>(
              type, stripe);
        }
        if (isLittleEndian()) {
          return std::make_unique<DoubleColumnReader<FLOAT, true, double, DoubleVectorBatch>>(
              type, stripe);
        }
        return std::make_unique<DoubleColumnReader<FLOAT, false, double, DoubleVectorBatch>>(
            type, stripe);
      }
      case DOUBLE: {
        if (isLittleEndian()) {
          return std::make_unique<DoubleColumnReader<DOUBLE, true, double, DoubleVectorBatch>>(
              type, stripe);
        }
        return std::make_unique<DoubleColumnReader<DOUBLE, false, double, DoubleVectorBatch>>(
            type, stripe);
      }
      case TIMESTAMP:
        return std::make_unique<TimestampColumnReader>(type, stripe, false);

      case TIMESTAMP_INSTANT:
        return std::make_unique<TimestampColumnReader>(type, stripe, true);

      case DECIMAL:
        // is this a Hive 0.11 or 0.12 file?
        if (type.getPrecision() == 0) {
          return std::make_unique<DecimalHive11ColumnReader>(type, stripe);
        }
        // can we represent the values using int64_t?
        if (type.getPrecision() <= Decimal64ColumnReader::MAX_PRECISION_64) {
          if (stripe.isDecimalAsLong()) {
            return std::make_unique<Decimal64ColumnReaderV2>(type, stripe);
          }
          return std::make_unique<Decimal64ColumnReader>(type, stripe);
        }
        // otherwise we use the Int128 implementation
        return std::make_unique<Decimal128ColumnReader>(type, stripe);

      default:
        throw NotImplementedYet("buildReader unhandled type");
    }
  }

}  // namespace orc
